package com.meta.api.flink.aggregate;

import org.apache.flink.table.functions.AggregateFunction;

//布尔跳变求和算子 - 0->1或者1->0的次数
public class BooleanTrendSummationFunctions {

    //算子名称
    public static final String BOOLTRENDSUM = "BOOLTRENDSUM";

    public static class ChooseHitAccum {
        // 最后一个数据
        public Double lastValue = null;
        // 跳变变化计数
        public long number = 0l;
    }

    /**
     * // register function
     * StreamTableEnvironment tEnv = ...
     * tEnv.registerFunction("BOOLTRENDSUM", new BooleanSummationAvg());
     * // use function
     * tEnv.sqlQuery("SELECT user, BOOLTRENDSUM(true|false, number) AS avgPoints FROM userScores GROUP BY user");
     */
    public static class BooleanTrendSummationAvg extends AggregateFunction<Long, ChooseHitAccum> {

        @Override
        public ChooseHitAccum createAccumulator() {
            return new ChooseHitAccum();
        }

        @Override
        public Long getValue(ChooseHitAccum acc) {
            return acc.number;
        }

        /**
         * 积累计算
         *
         * @param acc   对象
         * @param type  类型 true:0-1 false:1-0
         * @param value 数据
         */
        public void accumulate(ChooseHitAccum acc, boolean type, Double value) {
            // 排除第一条数据来统计
            if (acc.lastValue != null) {
                // 0->1
                if (type && acc.lastValue == 0 && value == 1) acc.number++;
                // 1->0
                if (!type && acc.lastValue == 1 && value == 0) acc.number++;
            }
            // 缓存数据
            acc.lastValue = value;
        }

    }
}
